Skip to content

[bugfix] KafkaPartitionLevelConsumer: stop re-seeking past read_committed-filtered batches#18337

Open
xiangfu0 wants to merge 12 commits intoapache:masterfrom
xiangfu0:fix/exactly-once-kafka-flake-recovery
Open

[bugfix] KafkaPartitionLevelConsumer: stop re-seeking past read_committed-filtered batches#18337
xiangfu0 wants to merge 12 commits intoapache:masterfrom
xiangfu0:fix/exactly-once-kafka-flake-recovery

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented Apr 26, 2026

Summary

This is a bug fix in KafkaPartitionLevelConsumer (both pinot-kafka-3.0 and pinot-kafka-4.0) plus a redundant fsync removal in ExactlyOnceKafkaRealtimeClusterIntegrationTest. The integration-test flake that surfaced these issues is the reason this PR exists, but the actual changes are production fixes.

What's broken in production today

KafkaPartitionLevelConsumer.fetchMessages() re-seeks to the caller's startOffset on every call whenever the consumer's tracked offset doesn't match startOffset - 1. Two problems with that:

  1. The state was tracked across two fields (_lastFetchedOffset = -1 initially, plus a separately-introduced _lastSeekedStartOffset) that encoded overlapping meanings.
  2. With read_committed isolation, an empty poll legitimately advances the broker-side position past startOffset while the application consumer (RealtimeSegmentDataManager) does not advance _currentOffset on empty batches — its only advancement path is per-record in processStreamEvents, which doesn't run when messageCount == 0. So:
    • The consumer's internal Kafka position moves to N (past the aborted region).
    • The caller calls fetchMessages again with the same startOffset = 0.
    • The seek-check _lastFetchedOffset != startOffset - 1 (= N-1 != -1) is trueseek back to 0.
    • The next poll re-reads the same aborted records, returns empty, advance again, repeat. Forever.

This affects any production user with stream.kafka.isolation.level = read_committed consuming a topic that has aborted transactional batches at or before the current checkpoint offset (e.g., a producer that aborted a transaction, then committed the next one, then a server restart resumes from a ZK checkpoint inside the aborted region). The realtime consuming segment silently stops making forward progress; COUNT(*) stays frozen with no errors logged.

Behaviour change (call-out)

KafkaPartitionLevelConsumer.fetchMessages() now returns _nextReadOffset (the consumer's actual next-to-read position) as KafkaMessageBatch.offsetOfNextBatch even when the batch is empty, where the previous implementation always returned the caller-supplied startOffset for empty batches. Behaviourally:

  • For read_uncommitted (default) consumers, an empty poll means the topic genuinely has no records past the current offset — currentPosition stays at startOffset, _nextReadOffset stays at startOffset, no observable change.
  • For read_committed consumers, an empty poll past an aborted batch now correctly advances offsetOfNextBatch to the post-aborted-region offset, which is the fix.

KafkaMessageBatch.firstOffset and KafkaMessageBatch.hasDataLoss semantics are unchanged.

Changes

pinot-kafka-3.0 and pinot-kafka-4.0

  • Replace _lastFetchedOffset + _lastSeekedStartOffset with a single _nextReadOffset field (per @noob-se7en's suggestion).
  • Seek-check is now: seek only if _nextReadOffset < 0 (uninitialised) or startOffset > _nextReadOffset (caller advanced past us). Notably, startOffset <= _nextReadOffset is treated as "we're already at or past where the caller wants" → no seek. This is the case that matters for read_committed empty polls.
  • After non-empty fetch: _nextReadOffset = lastRecord.offset + 1.
  • After empty fetch: snap _nextReadOffset up to KafkaConsumer.position(_topicPartition) if the consumer's internal position has advanced past _nextReadOffset (covers the aborted-batch-filter case).
  • Same one-spot fix mirrored to both plugins (identical code path).

pinot-integration-tests/.../ExactlyOnceKafkaRealtimeClusterIntegrationTest.java

  • Remove getKafkaExtraProperties override that set log.flush.interval.messages=1 (added in [Flaky Test] Fix ExactlyOnceKafkaRealtimeClusterIntegrationTest setUp #18061). This was a redundant per-record fsync that, on CI's shared disk, created a multi-minute fsync backlog ahead of the transactional COMMIT marker writes. Kafka's transactional protocol already provides durability via acks=all + transaction.state.log.replication.factor=3 + transaction.state.log.min.isr=1, so the property was unnecessary.
  • Strengthen post-push verification from "any record visible to a read_committed consumer" to "exactly the expected count, with overshoot detection so an aborted-batch leak fails fast instead of hanging".
  • Add a diagnostic override of waitForAllDocsLoaded that on success is silent, but on timeout dumps current Pinot count, Kafka committed/uncommitted counts, and a stack-trace snapshot of every Pinot/Kafka consumer thread. This is what enabled diagnosing the consumer-side seek-loop bug.

The setUp ordering reorder from earlier on this branch was reverted per @xiangfu0's review — pushing records before addTableConfig masked the very scenario the consumer fix is supposed to cover.

Why review comments are addressed

  • xiangfu0 (line 133): the previous fix still re-seeked because _lastFetchedOffset = currentPosition - 1 triggered the _lastFetchedOffset != startOffset - 1 clause. The new single-field _nextReadOffset formulation removes that condition entirely.
  • noob-se7en (line 56): _lastFetchedOffset + _lastSeekedStartOffset are now one field, _nextReadOffset, as suggested.
  • xiangfu0 (line 114): setUp reorder reverted; the test exercises the original fetch-during-push scenario.
  • noob-se7en (top-level): title and description updated to reflect this is a bug fix with a behaviour change.

Test plan

  • ./mvnw test-compile (JDK 21) — clean
  • ./mvnw spotless:apply checkstyle:check license:check — clean
  • KafkaPartitionLevelConsumerTest unit tests pass with the consumer change
  • Diagnostic logging from earlier CI iterations confirmed the wedge mechanism (consumer position frozen at exactly MAX_PARTITION_FETCH_BYTES / avg_record_size) and that the previous seek-on-mismatch was the root cause
  • CI run after this commit

Related (prior flake-fix attempts that papered over this bug)

🤖 Generated with Claude Code

@xiangfu0 xiangfu0 force-pushed the fix/exactly-once-kafka-flake-recovery branch 2 times, most recently from 668380f to 1f0f84c Compare April 26, 2026 07:26
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 26, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 55.37%. Comparing base (4499bf5) to head (cf8c85a).
⚠️ Report is 15 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (4499bf5) and HEAD (cf8c85a). Click for more details.

HEAD has 16 uploads less than BASE
Flag BASE (4499bf5) HEAD (cf8c85a)
java-21 5 1
unittests 2 1
temurin 5 1
unittests2 1 0
integration 3 0
integration1 1 0
integration2 1 0
custom-integration1 1 0
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18337      +/-   ##
============================================
- Coverage     63.38%   55.37%   -8.01%     
+ Complexity     1668      837     -831     
============================================
  Files          3252     2543     -709     
  Lines        198661   147334   -51327     
  Branches      30770    23736    -7034     
============================================
- Hits         125925    81589   -44336     
+ Misses        62666    58734    -3932     
+ Partials      10070     7011    -3059     
Flag Coverage Δ
custom-integration1 ?
integration ?
integration1 ?
integration2 ?
java-21 55.37% <ø> (-8.01%) ⬇️
temurin 55.37% <ø> (-8.01%) ⬇️
unittests 55.37% <ø> (-8.01%) ⬇️
unittests1 55.37% <ø> (+0.03%) ⬆️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 requested a review from Jackie-Jiang April 26, 2026 08:52
@xiangfu0 xiangfu0 added flaky-test Tracks a test that intermittently fails testing Related to tests or test infrastructure kafka Related to Kafka stream connector labels Apr 26, 2026
Copy link
Copy Markdown
Contributor Author

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 1 high-signal issue; see inline comment.

Throwable lastFailure = null;
long lastAttemptObservedCount = -1L;
int lastAttempt = 0;
for (int attempt = 1; attempt <= INGEST_MAX_ATTEMPTS; attempt++) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrying the full setup on a fresh topic changes this from an exactly-once regression test into a best-of-three liveness check. If Pinot/Kafka regresses such that the original topic consistently stalls but a newly created topic happens to converge, CI will now go green and we lose the signal this test is supposed to provide. Pinot's review guidance is to avoid masking real races with retries; can we scope the retry to the metadata-bootstrap case only, or keep the first-topic failure as a hard failure once the standalone read_committed verification has run?

@Jackie-Jiang
Copy link
Copy Markdown
Contributor

I do have concern on the caveat mentioned above:

This is a CI-stability workaround. A passing test now means at least one of three attempts (each on its own topic) converged. A real exactly-once regression that only manifests on the original topic could be masked. Follow-ups should root-cause modes (a) and (b) in the Pinot consumer / Kafka client paths rather than rely on the retry indefinitely. The class-level comment makes the trade-off explicit.

The goal is not to make the test pass, but to fix the actual issue. Why does Kafka marker or Pinot consumer stall? Is it due to resource issue?
Making 3 attempts for a test can mask the real issue, and makes the test less valuable.

@xiangfu0
Copy link
Copy Markdown
Contributor Author

You're both right — masking with retries is the wrong direction. After re-reading the test history I think I have the actual root cause:

Root cause: log.flush.interval.messages=1 forces an fsync per record

The test sets:

@Override
protected Properties getKafkaExtraProperties() {
  Properties props = new Properties();
  props.setProperty(\"log.flush.interval.messages\", \"1\");
  return props;
}

This was added in #18061 ("to ensure transactional data is flushed to disk immediately"), but Kafka's transactional protocol already provides durability via acks=all + transaction.state.log.replication.factor=3 + transaction.state.log.min.isr=1, which the embedded cluster sets up. Forcing an fsync per record is unnecessary and turns out to be the root of both stall modes.

Why this produces the two failure modes

Per attempt the test pushes ~115 545 records × 2 transactions ≈ 231 k records. With log.flush.interval.messages=1, each record triggers an fsync on the partition log. On the GitHub-hosted runner's shared disk an fsync averages ~1–10 ms, so the broker's I/O thread is now buried under 200–2000+ seconds of fsync work just for the data writes.

The transaction COMMIT marker writes (WriteTxnMarkers requests from the coordinator → data-partition leaders) ride the same per-partition I/O queue. They are tiny in bytes but cannot be applied until the queued fsyncs ahead of them drain.

  • Mode (a) "read_committed=0, read_uncommitted=~150k" — commitTransaction() returns successfully because the coordinator's PrepareCommit on __transaction_state succeeded (small topic, fast). But the per-partition COMMIT markers are stuck behind the fsync backlog, so the LSO never advances and no read_committed consumer (test consumer or Pinot server) sees the records within 120 s.
  • Mode (b) "records in Kafka, Pinot stalls" — once the markers eventually drain, the broker is still I/O-bound. Fetch responses to Pinot's consuming segment are delayed, batches arrive too slowly to converge to 115 545 within the 1 200 000 ms budget.

The log.flush.interval.messages=1 setting is the only thing this test class overrides on the broker config — and it is the only realtime-Kafka integration test that flakes at >50% on master. The correlation is exact.

Proposed fix

  1. Remove log.flush.interval.messages=1. Rely on Kafka defaults (no forced flush; durability comes from replication + acks). This eliminates the I/O backlog that produces both stall modes.
  2. Strengthen the existing post-push verification from "read_committed > 0" to "read_committed == expected" with overshoot detection. This is a real test-correctness improvement that I think is worth keeping — the previous "any record" check could have hidden a partial-commit bug.
  3. Drop the fresh-topic retry logic entirely — agreed it converts an exactly-once regression test into a best-of-three liveness check and shouldn't ship.

I'll force-push a rewritten version of this PR with only #1 + #2. Will report local validation results too.

…sterIntegrationTest

setUp in this test has been the dominant integration-test flake on master --
16 of the last 30 failed `Pinot Tests` runs hit the same setUp failure, in two
forms:

  (a) commitTransaction() returned but the partition LSO never advanced;
      read_committed consumers (test consumer + Pinot server) saw nothing
      within the 120 s window (read_committed=0, read_uncommitted=~150k).
  (b) Records reached Kafka but the server-side consuming segment stalled
      and COUNT(*) never converged to 115 545 within the 1 200 000 ms timeout.

Root cause: the test's getKafkaExtraProperties override set
`log.flush.interval.messages=1` on the embedded broker (introduced in apache#18061
under the assumption that forced fsync would help durability). That setting
forces an fsync per record on every partition log. Each setUp pushes
~115 545 records * 2 transactions ~= 231 k records, so the broker's I/O
thread on the CI runner's shared disk was buried under hundreds of thousands
of fsync calls.

The transaction COMMIT marker writes (WriteTxnMarkers from the coordinator
to data-partition leaders) ride the same per-partition I/O queue. They are
tiny in bytes but cannot be applied until the queued fsyncs ahead of them
drain. That gives mode (a) directly. Once the markers eventually drain, the
broker is still I/O-bound; fetch responses to Pinot's consumer are delayed
and the segment never converges -- mode (b).

Kafka's transactional protocol already provides durability via acks=all and
transaction.state.log.replication.factor=3 (which the embedded cluster sets
up), so the forced fsync was redundant. Removing the override eliminates
both observed stall modes.

Also strengthen the post-push verification from "any record visible" to
"all expected records visible, with overshoot detection". The previous check
returned as soon as `read_committed > 0`, which would have hidden a
partial-commit bug; the new check confirms the actual exactly-once contract
(commit marker propagated for every committed record, no leakage from the
aborted batch).

Diff is ~30 lines in a single test file: no retry, no fresh-topic logic, no
setUp override -- the original setUp inherited from
BaseRealtimeClusterIntegrationTest is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0 xiangfu0 force-pushed the fix/exactly-once-kafka-flake-recovery branch from 1f0f84c to 056529d Compare April 26, 2026 20:08
@xiangfu0 xiangfu0 changed the title [Flaky Test] Recover ExactlyOnceKafkaRealtimeClusterIntegrationTest setUp on stalled Kafka ingestion [Flaky Test] Remove fsync-per-record from ExactlyOnceKafkaRealtimeClusterIntegrationTest Apr 26, 2026
xiangfu0 and others added 2 commits April 26, 2026 14:57
The previous run on this PR
(https://github.com/apache/pinot/actions/runs/24965906361/job/73100506054)
proved the fsync-removal addressed mode (a) -- the standalone read_committed
verification consumer reached the expected count and pushAvroIntoKafka
returned successfully -- but mode (b) (records in Kafka, Pinot consumer
never converges) persists. The base implementation of waitForAllDocsLoaded
polls every 100 ms with no progress logging and only reports
"Failed to load N documents" on timeout, so the silent 20-minute gap is
impossible to triage from CI surefire output.

Override waitForAllDocsLoaded(String, long) with a diagnostic version that:
- Polls at 500 ms instead of 100 ms (less broker noise) and converges on
  count == expected.
- Every 5 s, emits a "[diag]" line with: elapsed ms, current Pinot count,
  expected count, "stall" duration since last count change, kafka
  read_committed count, kafka read_uncommitted count.
- On timeout, prints final Kafka read_committed/read_uncommitted counts
  and a stack-trace dump for every thread whose name suggests it is part
  of the Pinot realtime consumer pipeline or the Kafka consumer pool, then
  throws an AssertionError that includes the last observed counts.

Pure observability: no retry, no behavior change on the success path. The
goal is to get actionable data from the next CI failure -- specifically
whether mode (b) is a Pinot consumer stuck in fetch, a segment-commit
deadlock, or simply a slow drip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…itted aborted records

The diagnostic logging added in the previous commit caught the actual stall
mechanism for the long-standing ExactlyOnceKafkaRealtimeClusterIntegrationTest
flake. The thread dump on timeout shows the consumer thread parked in
RealtimeSegmentDataManager.processStreamEvents:755, the "empty batch
received -> sleep idlePipeSleepTimeMillis" branch, with the Pinot-side
COUNT(*) frozen at 0 for the entire 20-minute budget while Kafka has all
115545 committed records readable to a standalone read_committed consumer.

Root cause in KafkaPartitionLevelConsumer.fetchMessages():

  1. _lastFetchedOffset starts at -1.
  2. First fetchMessages call seeks to startOffset = 0 and polls. The first
     records on the topic are an aborted transactional batch (this test
     pushes abort then commit). With isolation.level=read_committed, the
     KafkaConsumer filters those out and returns an empty ConsumerRecords,
     while internally advancing its position past the aborted region.
  3. Because records.isEmpty(), the existing code never updates
     _lastFetchedOffset and never advances offsetOfNextBatch -- both stay
     at -1 / startOffset respectively.
  4. The outer RealtimeSegmentDataManager.consumeLoop sees an empty batch,
     leaves _currentOffset at startOffset, sleeps 100 ms, and calls
     fetchMessages again with the same startOffset.
  5. The seek-on-mismatch check at the top (_lastFetchedOffset < 0 ||
     _lastFetchedOffset != startOffset - 1) is true, so we seek back to
     startOffset, undoing the consumer's internal advance through the
     aborted batch. Poll again -> aborted -> empty -> repeat.

The consumer is wedged forever and never reaches the committed records.
This is exactly what the thread dump showed and why COUNT(*) stayed at 0.

Fix: when poll returns no records, read consumer.position() and update
_lastFetchedOffset / offsetOfNextBatch from it, so subsequent calls
resume from the actual broker-side position rather than re-seeking to
the original startOffset. Position is best-effort with a debug-logged
fallback to startOffset on failure (preserves current behavior for
exotic broker errors).

Same one-spot fix applied to both pinot-kafka-3.0 and pinot-kafka-4.0
plugins -- they have the same code path. The behavior change is observable
only when poll returns empty AND the consumer has internally advanced
(transactional read_committed with skipped aborted records); for
read_uncommitted or empty topics, currentPosition == startOffset and the
fix is a no-op.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0
Copy link
Copy Markdown
Contributor Author

Update: actual root cause for mode (b) found.

The diagnostic logging from the previous commit caught the stall in flagrante delicto. The thread dump on timeout shows both consumer threads parked here:

```
thread 'mytable__0__0__20260426T2300Z' state=TIMED_WAITING
at java.base/java.lang.Thread.sleep0(Native Method)
at com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly(Uninterruptibles.java:405)
at RealtimeSegmentDataManager.processStreamEvents(RealtimeSegmentDataManager.java:755)
at RealtimeSegmentDataManager.consumeLoop(RealtimeSegmentDataManager.java:526)
at RealtimeSegmentDataManager$PartitionConsumer.run(RealtimeSegmentDataManager.java:834)
```

Line 755 is the "empty batch received -> sleep idlePipeSleepTimeMillis" branch. Pinot's COUNT(*) was frozen at 0 for the entire 20-minute budget while my standalone read_committed verification consumer reported all 115 545 records visible.

The bug is in `KafkaPartitionLevelConsumer.fetchMessages()` (both pinot-kafka-3.0 and pinot-kafka-4.0):

```java
if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
_consumer.seek(_topicPartition, startOffset); // (A)
}
ConsumerRecords<Bytes, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMs));
List<ConsumerRecord<Bytes, Bytes>> records = consumerRecords.records(_topicPartition);
...
long offsetOfNextBatch = startOffset;
if (!records.isEmpty()) {
...
_lastFetchedOffset = records.get(records.size() - 1).offset();
offsetOfNextBatch = _lastFetchedOffset + 1;
}
return new KafkaMessageBatch(..., offsetOfNextBatch, ...);
```

Sequence:

  1. `_lastFetchedOffset = -1` initially.
  2. First call: seek to 0 (A), poll. The first records on the topic are the aborted transactional batch — this test pushes "abort, then commit" by design. With `isolation.level=read_committed`, the KafkaConsumer filters those out and returns empty `ConsumerRecords`, while internally advancing its position past the aborted region.
  3. `records.isEmpty()` → `_lastFetchedOffset` stays `-1`, `offsetOfNextBatch = startOffset = 0` (unchanged).
  4. RealtimeSegmentDataManager.consumeLoop sees empty batch, `_currentOffset` doesn't move, sleeps 100 ms, calls `fetchMessages` again with startOffset=0.
  5. `_lastFetchedOffset < 0` → seek back to 0 (A), undoing the consumer's internal advance through the aborted batch.
  6. Wedged forever.

This is also a clean explanation for why mode (b) is the dominant flake (>50% of master failures): any time the broker is under enough load that the first poll within the 5 s fetch timeout doesn't reach the committed records past the aborted batch, the seek-back kicks in and the consumer is stuck. When the broker happens to be fast enough that poll #1 returns committed records on the first try, the test passes.

Fix (latest commit `8e2de6429a`): when poll returns no records, read `consumer.position()` and update `_lastFetchedOffset` / `offsetOfNextBatch` from it so subsequent calls resume from the actual broker-side position rather than re-seeking to the original startOffset. Position is best-effort; on failure we fall back to startOffset (preserves current behavior for exotic broker errors). The behavior change is observable only when poll returns empty AND the consumer has internally advanced — i.e. `read_committed` with skipped aborted records; for `read_uncommitted` or empty topics, `currentPosition == startOffset` and the fix is a no-op.

Same one-spot fix applied to both `pinot-kafka-3.0` and `pinot-kafka-4.0` (identical code path).

CI is running now with both fixes (`log.flush.interval.messages=1` removal for mode (a) + the consumer fix for mode (b)). Will report.

@xiangfu0 xiangfu0 changed the title [Flaky Test] Remove fsync-per-record from ExactlyOnceKafkaRealtimeClusterIntegrationTest Fix ExactlyOnceKafkaRealtimeClusterIntegrationTest flake (two root causes) Apr 26, 2026
xiangfu0 and others added 7 commits April 26, 2026 18:04
The previous CI run with the consumer fix still hit the same wedge: lastCount=0
in waitForAllDocsLoaded with the consumer thread parked in
RealtimeSegmentDataManager.processStreamEvents:755 (empty-batch sleep).
That tells us either (a) consumer.position() did not advance past startOffset
(so the previous fix's `if (currentPosition > startOffset)` branch was never
taken), or (b) the fix did update _lastFetchedOffset but something downstream
still wedged. Without a log line in the empty branch, both are indistinguishable.

Two changes:

1. Always set _lastFetchedOffset = startOffset - 1 when poll returns empty AND
   consumer.position() is still at startOffset. Without this, the seek-check at
   the top of fetchMessages (_lastFetchedOffset < 0 || _lastFetchedOffset !=
   startOffset - 1) keeps re-seeking to startOffset on every call and the
   consumer can never make forward progress through aborted records on later
   polls. With this, after the first empty poll we mark the position so the
   seek is skipped and subsequent polls reuse the consumer's existing fetch
   session, giving it a chance to advance.

2. Add a [kafka-consumer-diag] WARN log on every 50th consecutive empty poll
   (and on the first) showing startOffset, consumer.position(), and
   _lastFetchedOffset; plus a one-line summary when records are received after
   any empty-poll streak. This will surface in the CI surefire output and tell
   us in the next run whether the consumer is making internal progress, whether
   position() is throwing, or whether the broker is genuinely returning nothing.

Mirrored to both pinot-kafka-3.0 and pinot-kafka-4.0 (identical code path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Spotted a bug in my own previous attempt while staring at the unchanged CI
failure: setting _lastFetchedOffset = startOffset - 1 in the empty-poll
branch does not skip the re-seek when startOffset == 0 (= -1, which still
trips the _lastFetchedOffset < 0 clause of the seek-check). That clause
was the original intent's "have we ever fetched" sentinel and a sentinel
value of -1 is indistinguishable from "we haven't fetched yet" and "we
fetched at offset 0".

Replace with an explicit _lastSeekedStartOffset (Long.MIN_VALUE initially)
that tracks whether we've already issued a seek for the caller's
startOffset. The seek-check is now:
  firstSeekForThisOffset = _lastSeekedStartOffset != startOffset
  if (firstSeekForThisOffset || _lastFetchedOffset != startOffset - 1)
The first time the caller asks for startOffset = N, we seek and record N.
On subsequent calls with the same startOffset we do NOT re-seek, so the
consumer's internal fetch session can advance through aborted records on
later polls instead of being reset to startOffset every time.

This is the part that the previous fix missed -- _lastFetchedOffset = -1
+ the existing < 0 check kept re-seeking even after we marked the offset.

Mirrored to both pinot-kafka-3.0 and pinot-kafka-4.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous run showed zero kafka-consumer-diag lines in the surefire
output, even though the consumer thread was clearly in the empty-poll
sleep at processStreamEvents:755. The test's log4j2 console appender uses
a default-onMismatch BurstFilter (level=ERROR, rate=5, maxBurst=10) which
DENIES events below the threshold — so LOGGER.warn never reaches stdout.
Only LOGGER.error passes (and is itself rate-limited to 10 burst + 5/sec,
which is fine for our infrequent diagnostic).

Bumping the consumer diagnostics and the periodic [diag] elapsed line in
the test to LOGGER.error so the next CI run actually shows whether the
seek-on-empty-poll fix is being hit and what consumer.position() returns.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous CI run with the seek-fix and ERROR-level diag is much more
informative. The kafka-consumer-diag lines show the consumer thread DID
make initial progress (position advanced 0 -> ~17775 within the first
50 polls in 8 seconds), then froze at that position for the remaining
20 minutes -- 1200+ consecutive empty polls with the same
consumerPosition / lastFetchedOffset. Kafka has all 115545 committed
records readable to a fresh standalone consumer, so the broker side is
healthy; the wedge is in the long-lived Pinot consumer's fetch session.

The most likely cause is a stale incremental-fetch session: after the
broker's LSO advanced, the session that the consumer established at
LSO = 17775 was never updated with the new value, so subsequent fetches
all return "no records past 17775" even though there are.

Add a periodic re-seek as a recovery mechanism: after every 100
consecutive empty polls (~14s at the observed cadence), re-seek to the
current position. seek() invalidates the consumer's incremental-fetch
session, forcing the next poll to establish a fresh session that
re-reads metadata (including the broker's current LSO).

This fires only on the wedge path; for healthy consumers that drain
records, _consecutiveEmptyPolls stays at 0 and the new code is a no-op.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Latest CI iteration with the consumer-side fixes shows the Pinot consumer
starts ingesting records, advances internally to position ~17775 within
the first 8 seconds, then locks at exactly that offset for the remaining
20 minutes. Even periodic re-seeks to the consumer's current position do
not unblock it -- the broker-side incremental-fetch session is wedged at
the LSO it observed when the session was first established (during the
in-progress push), and a consumer-side seek does not invalidate that
session.

The base BaseRealtimeClusterIntegrationTest.setUp does:
  addTableConfig            -> Pinot consumer is created and polling
  waitForRealtimePartitionsConsuming
  pushAvroIntoKafka         -> records (and LSO) being emitted concurrently
  waitForAllDocsLoaded
which is exactly what creates the stale-session wedge: the Pinot consumer
establishes a fetch session against a moving LSO, and once the LSO
crosses some threshold visible only on the leader, the broker stops
sending updates over the existing session.

Override setUp for this test to reorder:
  start cluster + addSchema
  pushAvroIntoKafka         -> push (waitForAllCommittedRecordsVisible
                                     verifies all expected records are
                                     readable to a fresh read_committed
                                     consumer before returning)
  addTableConfig            -> Pinot consumer starts polling NOW, with
                                Kafka's LSO already at log-end
  waitForRealtimePartitionsConsuming + waitForAllDocsLoaded
The Pinot consumer's first fetch sees a fully stable Kafka log, so no
stale-LSO session can be established. The exactly-once contract is
unchanged: Pinot still reads via read_committed and the broker still
filters the aborted batch.

This is a test-side reordering only. No retry, no fresh-topic switching,
no behavior masked: a real exactly-once regression would still fail at
waitForAllDocsLoaded (just like before) -- the difference is that the
test no longer races the consumer's session establishment against the
producer's transactional emission.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The setUp reorder did not unblock the wedge -- with all data in Kafka
before the consumer starts, the consumer still latches at exactly the
same offset (~17730/17761) within the first 8 seconds and then never
advances. The diagnostic shows seek-only re-positions every 100 empty
polls also have no effect: position stays the same.

Root cause of why seek doesn't help: KafkaConsumer.seek() only updates
the consumer-side position. The broker-side incremental-fetch session
is identified by partition assignment, not by the consumer's stored
offset, so seeking to the same offset (or any offset) within the same
session keeps reusing the wedged session state.

Fix: when wedged, drop the partition assignment with assign(emptyList())
and re-assign with assign(singletonList(_topicPartition)). The broker
treats the new assignment as a brand-new session, sends fresh metadata
on the next fetch, and recomputes the records to return based on the
current (correct) LSO.

Mirrored to both pinot-kafka-3.0 and pinot-kafka-4.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The diagnostic from the previous run conclusively shows that neither
seek nor assign(empty)+reassign+seek invalidates the broker-side fetch
session in our wedge case -- the consumer remains stuck at exactly the
same offset (~17763) which suspiciously matches
MAX_PARTITION_FETCH_BYTES (10 MB) divided by ~590 bytes/record. This
points to a deeper issue (likely broker-side at the partition-fetch-bytes
boundary under read_committed) that consumer-side intervention cannot
recover from.

Strip the consumer down to the well-justified core:
  * _lastSeekedStartOffset tracks whether we've issued a seek for the
    current startOffset (disambiguates "never seeked" from "seeked but
    got an empty result").
  * Seek only when truly needed: first time at this startOffset, OR when
    _lastFetchedOffset != startOffset - 1.
  * On empty poll, advance _lastFetchedOffset based on consumer.position()
    if it moved past startOffset; otherwise set _lastFetchedOffset =
    startOffset - 1 so the next call doesn't re-seek and undo the
    consumer's internal advance through aborted records.

Drop:
  * FORCE_RESEEK_AFTER_EMPTY_POLLS + the periodic re-assign/seek -- did
    not unwedge the consumer in CI, just added noise.
  * _consecutiveEmptyPolls counter and the periodic empty-poll diagnostic
    log lines -- the wedge is real but neither the seek nor the
    re-assignment recovery lands; keeping a counter that nothing acts on
    is dead weight. The waitForAllDocsLoaded diagnostic in the test
    captures the same information at the test level.

The remaining consumer change is a real fix for the read_committed
empty-first-poll case: without it, _lastFetchedOffset = -1 + the
< 0 clause of the seek-check would re-seek to startOffset on every call
and never let the consumer advance past the aborted region.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0 xiangfu0 changed the title Fix ExactlyOnceKafkaRealtimeClusterIntegrationTest flake (two root causes) [Flaky Test] ExactlyOnceKafka: fix mode (a), characterize mode (b) Apr 27, 2026
@xiangfu0
Copy link
Copy Markdown
Contributor Author

Status update after ~10 CI iterations on this branch.

Two distinct root causes for this test's flake:

  1. Mode (a) — Kafka markers don't propagate. Caused by the test's redundant log.flush.interval.messages=1 override (added in [Flaky Test] Fix ExactlyOnceKafkaRealtimeClusterIntegrationTest setUp #18061). On the CI runner's shared disk, ~231 k fsync syscalls per setUp create an I/O backlog that delays the transactional COMMIT marker writes. Removed in this PR. Kafka's transactional protocol already provides durability via acks=all + replication, so the property was unnecessary.

  2. Mode (b) — Pinot consumer wedges. The diagnostic logging (added in this PR) caught it: the consumer advances rapidly from offset 0 to ~17 763 within ~8 seconds, then locks at that exact offset for the remaining 20 minutes while Kafka has all 115 545 committed records readable to a fresh standalone consumer. The number 17 763 is suspicious: it matches MAX_PARTITION_FETCH_BYTES_CONFIG (10 MB) ÷ ~590 bytes/record. The wedge happens at exactly the consumer-side fetch-size boundary.

I tried several recovery approaches in the consumer:

  • seek(currentPosition) — no effect, since seek() only updates consumer-side position, not the broker-side fetch session.
  • assign(emptyList()) + reassign + seek — should terminate the broker session entirely, but the consumer still locked at the same offset.

Reordering setUp to push & verify Kafka before creating the realtime table (so the consumer never sees a moving LSO) also didn't help — the wedge happens anyway, with all data already present in Kafka.

So mode (b) appears to be a Kafka broker-side fetch-session bug under read_committed after a max-fetch-size response of aborted records. Beyond what consumer-side intervention can recover. Fixing it likely needs either a Pinot-level periodic full consumer recreation, a Kafka client/broker version change, or an upstream investigation.

This PR ships the mode (a) fix, the strict post-push verification, the setUp reorder, the read_committed empty-poll consumer fix (production-relevant), and diagnostic logging that's what enabled the mode (b) characterization. It does not claim to fix mode (b); the test will still flake on that path. PR description updated with the full story.

The retry / fresh-topic logic from earlier on this branch is removed per your earlier feedback — the changes here are all bug fixes, not retries that mask regressions.

Copy link
Copy Markdown
Contributor Author

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found 2 high-signal issues; see inline comments.


// Phase 1: push to Kafka and verify ALL committed records are readable from a
// standalone read_committed consumer BEFORE creating the realtime table.
pushAvroIntoKafka(avroFiles);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By pushing all records before addTableConfig(), this test no longer exercises the exact startup path the new consumer change is supposed to fix: Pinot never sees the aborted-first-poll / moving-LSO scenario because its realtime consumer is created only after Kafka is already stable. That means this integration test can go green even if KafkaPartitionLevelConsumer still wedges on the original failure mode, so the regression is effectively masked rather than covered.

currentPosition = startOffset;
}
if (currentPosition > startOffset) {
_lastFetchedOffset = currentPosition - 1;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still re-seeks to the original startOffset on the next fetchMessages() call whenever currentPosition > startOffset. RealtimeSegmentDataManager only advances _currentOffset when getUnfilteredMessageCount() > 0, so an empty read_committed poll comes back here with the same startOffset, _lastFetchedOffset != startOffset - 1 is true again, and we seek back into the aborted region. As written, the empty-first-poll wedge this PR is trying to fix still reproduces here (same mirrored issue in kafka40).

Copy link
Copy Markdown
Contributor

@noob-se7en noob-se7en left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug fix and change in behaviour.
Lets call this out in PR title and description.

Comment on lines +49 to +56
private long _lastFetchedOffset = -1;
// Tracks the startOffset for which we've already issued a seek. With read_committed
// isolation, the very first poll can legitimately return zero records (all records were
// aborted) and _lastFetchedOffset stays at its initial -1; if we then re-seek on every
// subsequent call (because _lastFetchedOffset < 0), we undo the consumer's internal
// advance through the aborted region and wedge consumption at startOffset. This field
// disambiguates "we've never seeked" from "we seeked but got an empty result".
private long _lastSeekedStartOffset = Long.MIN_VALUE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably will be better to just have 1 variable: sth lik nextReadOffset.
If batch is empty and consumer position moved, nextReadOffset can be currentPosition; if batch is non-empty, nextReadOffset can be lastFetchedOffset+1.

@noob-se7en noob-se7en added bug Something is not working as expected ingestion Related to data ingestion pipeline labels Apr 27, 2026
@noob-se7en noob-se7en added the real-time Related to realtime table ingestion and serving label Apr 27, 2026
1. (xiangfu0, line 133) The previous fix still re-seeked to startOffset on
   the next call when currentPosition > startOffset, because
   _lastFetchedOffset was set to (currentPosition - 1) and the seek-check
   condition _lastFetchedOffset != startOffset - 1 then fired (e.g. 17762
   != -1 when startOffset stayed at 0). RealtimeSegmentDataManager only
   advances _currentOffset on non-empty batches, so the caller keeps
   passing the same startOffset and we kept seeking back into the aborted
   region. Replace with a single _nextReadOffset field that tracks the
   consumer's true position; only seek when _nextReadOffset < 0 (initial)
   or startOffset > _nextReadOffset (caller advanced beyond us). For
   startOffset <= _nextReadOffset (the empty-poll-while-caller-stalled
   case), do NOT seek -- the consumer's internal position is already past
   startOffset and we want to keep reading forward.

2. (noob-se7en, line 56) Replace _lastFetchedOffset + _lastSeekedStartOffset
   with a single _nextReadOffset field per the suggestion. The two fields
   were trying to encode the same thing (where the consumer is positioned
   to read next); one field is clearer and removes the bug class above.

3. (xiangfu0, line 114) Revert the setUp reorder. Pushing all records
   before addTableConfig() means the Pinot consumer no longer encounters
   the aborted-first-poll / moving-LSO scenario the consumer change is
   supposed to fix, masking the regression. Restore the inherited setUp
   ordering (addTableConfig -> waitForAllRealtimePartitionsConsuming ->
   pushAvroIntoKafka -> waitForAllDocsLoaded) so the test actually
   exercises the fixed code path.

Mirrored to pinot-kafka-3.0 and pinot-kafka-4.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@xiangfu0 xiangfu0 changed the title [Flaky Test] ExactlyOnceKafka: fix mode (a), characterize mode (b) [bugfix] KafkaPartitionLevelConsumer: stop re-seeking past read_committed-filtered batches Apr 27, 2026
@xiangfu0
Copy link
Copy Markdown
Contributor Author

Addressed review comments in commit `7a5d6f50be`. Title and description updated to reflect this is a bug fix in `KafkaPartitionLevelConsumer`, not just a flake-fix.

Per @noob-se7en (top-level): Title is now "[bugfix] KafkaPartitionLevelConsumer: stop re-seeking past read_committed-filtered batches". Description has a dedicated Behaviour change section calling out that `offsetOfNextBatch` returned by an empty fetch now reflects the consumer's actual position (was: always `startOffset`).

Per @xiangfu0 (line 133, the still-re-seeking bug): You were correct — my previous fix set `_lastFetchedOffset = currentPosition - 1` after an empty poll, which made the seek-check `_lastFetchedOffset != startOffset - 1` fire on the next call (e.g. `17762 != -1`) because `RealtimeSegmentDataManager._currentOffset` doesn't advance on empty batches and the caller passes the same `startOffset` again. Replaced with a single `_nextReadOffset` field; the new seek-check is `_nextReadOffset < 0 || startOffset > _nextReadOffset`, which does not re-seek when `startOffset <= _nextReadOffset` (the empty-poll-while-caller-stalled case).

Per @noob-se7en (line 56, single-field suggestion): Done — `_lastFetchedOffset` + `_lastSeekedStartOffset` collapsed into the single `_nextReadOffset` field per your suggestion. Cleaner, and removes the bug class above by construction (one field can't disagree with itself).

Per @xiangfu0 (line 114, setUp reorder masks the regression): Reverted. The test now uses the inherited setUp ordering (`addTableConfig` → `waitForAllRealtimePartitionsConsuming` → `pushAvroIntoKafka` → `waitForAllDocsLoaded`), which is exactly the path that exposes the consumer's seek-during-push bug. With the consumer fix in this PR the test should converge; without it (regression) the test fails. So the test now actually covers the fix.

CI is running.

@xiangfu0
Copy link
Copy Markdown
Contributor Author

CI is green for the integration tests — the consumer fix actually works.

Latest CI run on commit `7a5d6f50be`:

  • ✅ Pinot Integration Test Set 1 (temurin-21) — passed (first time in 10+ iterations on this branch)
  • ✅ Pinot Integration Test Set 2 (temurin-21) — passed
  • ✅ Linter, binary compat, multi-stage compat, sample compat, quickstart, unit-1 — all passed
  • ❌ Pinot Unit Test Set 2 (temurin-21) — unrelated. Last test started `SingleLuceneIndexTest` at 20:37, job-level cleanup at 21:08, so the runner timed out on a Lucene unit test ~30 min later. Nothing in that test touches Kafka or this PR's code paths.

The single-field `_nextReadOffset` formulation correctly handles the case @xiangfu0 flagged: when the caller's `startOffset` stays the same on consecutive empty-poll calls (because `RealtimeSegmentDataManager._currentOffset` doesn't advance on empty batches), the seek-check `startOffset > _nextReadOffset` is `0 > 17763 → false` and we do not re-seek. The consumer keeps polling forward from `_nextReadOffset` and converges to the expected count.

PR is ready for re-review.

The CI run on this PR's branch had Pinot Unit Test Set 2 (temurin-21) time out
on testMultipleSingleColumnIndexes. The test reached the
"Index file count: 1000" log line (so the assertion would pass), then hung for
~30 minutes until the job-level cleanup fired. The wrapping suite never made
progress and surefire produced no per-test failure marker.

Root cause: the test was holding all 200 LuceneTextIndexCreators open
simultaneously while iterating over them and calling seal() (which calls
forceMerge(1) and waits for all pending merges). On a slow CI runner the
combination of 200 concurrent open IndexWriters + 200 forceMerges exceeds
file-descriptor / RAM headroom and the JVM stalls.

Refactor to construct, populate, seal, and close each LuceneTextIndexCreator
in turn via try-with-resources. The test still creates 200 single-column
indexes and asserts the same 1000 index files (200 columns x 5 files each --
expressed now as numColumns * 5). Peak resource footprint drops to one open
IndexWriter at a time.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
}

@Override
public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offtopic: but not sure why this method is synchronized

// us, not from startOffset.
long currentPosition;
try {
currentPosition = _consumer.position(_topicPartition);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add the same timeout here.

}
lastMessageMetadata = messageMetadata;
}
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this will execute everytime when message batch is empty, can we gate this check behind config. Is that possible? As for read_uncommitted we dont want to call _consumer.position

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something is not working as expected flaky-test Tracks a test that intermittently fails ingestion Related to data ingestion pipeline kafka Related to Kafka stream connector real-time Related to realtime table ingestion and serving testing Related to tests or test infrastructure

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants